Elasticsearh - usign python API

Import it and created client object


In [1]:
from elasticsearch import Elasticsearch, helpers, exceptions as es_exceptions
from elasticsearch.helpers import bulk,scan

es = Elasticsearch(['atlas-kibana.mwt2.org:9200'],timeout=60)

Check we can connect to the cluster


In [2]:
# define our address so we dont have to type it all the time
es.cluster.health()


Out[2]:
{'active_primary_shards': 13370,
 'active_shards': 18516,
 'active_shards_percent_as_number': 100.0,
 'cluster_name': 'atlas',
 'delayed_unassigned_shards': 0,
 'initializing_shards': 0,
 'number_of_data_nodes': 5,
 'number_of_in_flight_fetch': 0,
 'number_of_nodes': 10,
 'number_of_pending_tasks': 0,
 'relocating_shards': 0,
 'status': 'green',
 'task_max_waiting_in_queue_millis': 0,
 'timed_out': False,
 'unassigned_shards': 0}

Create a bunch of documents


In [3]:
import random
allEvents=[]
for e in range(1000):
    E=random.gauss(1000., 30.)
    pT=random.gauss(100., 50.)
    allEvents.append({ 'eventnr':e, 'E':E, 'pT':pT})

Index documents one by one

takes around 70 seconds.


In [4]:
from time import time
start=time()
for nr, event in enumerate(allEvents):
    es.create(index='my_events', doc_type='event', id=nr, body=event)
print("it took", time()-start,"seconds.")


it took 84.49716067314148 seconds.

Bulk indexing of same events


In [5]:
# clean all
try:
    es.indices.delete(index='my_events')
except:
    print("not there?")
    
start=time()
try:
   res = helpers.bulk(es, allEvents, index='my_events', doc_type='event', raise_on_exception=True,request_timeout=60)
except es_exceptions.ConnectionError as e:
   print('ConnectionError ', e)
except es_exceptions.TransportError as e:
   print('TransportError ', e)
except helpers.BulkIndexError as e:
   print(e[0])
   for i in e[1]:
      print(i)
except Exception as e:
   print('Something seriously wrong happened.',e)

print("it took", time()-start,"seconds.")


it took 5.286314964294434 seconds.

Search for a document


In [6]:
my_query={
    "size": 15,
    "query":{
       "bool":{
            "must":[
                {'range': {'pT': {'gte': 100, 'lt': 120}}},
                {"range" : { "E" :{'gte': 200 }}}
            ]
        }
    }
}

res = es.search(index='my_events', body=my_query )
for r in res['hits']['hits']:
    print(r)

In [7]:
my_query={
    "aggs" : {
        "pt_bins" : {
            "histogram" : {
                "field" : "pT",
                "interval" : 50
            }
        }
    }
}

res = es.search(index='my_events', body=my_query )
print(res['aggregations'])


{'pt_bins': {'buckets': []}}

Clean up


In [8]:
es.indices.delete(index='my_events')


Out[8]:
{'acknowledged': True}

In [ ]: